package com.siyoumi.app.netty;

import com.siyoumi.app.netty.to.NettyConnectionData;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

//服务端-mqtt
@Slf4j
public abstract class NettyMqttServer {
    @SneakyThrows
    final protected void handle(NettyConnectionData data) {
        int maxFrameSize = 1024;

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.localAddress(new InetSocketAddress(data.getIp(), data.getPort()));
            //
            serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) //连接超时5秒
                    .option(ChannelOption.SO_BACKLOG, 1024) //连接队列大小
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .option(ChannelOption.SO_RCVBUF, 10485760)
            ;

            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();

                    //心跳检测，实现超时断开连接
//                    pipeline.addLast(new IdleStateHandler(data.getIdleTimeSeconds(), 0, 0, TimeUnit.SECONDS));
//                    pipeline.addLast(new WebSocketServerCompressionHandler());
//                    pipeline.addLast(new WebSocketServerProtocolHandler(data.getPath(), null, true, maxFrameSize, false,
//                            true));

                    pipeline.addLast(new IdleStateHandler(600, 600, 1200));
                    pipeline.addLast("encoder", MqttEncoder.INSTANCE);
                    pipeline.addLast("decoder", new MqttDecoder());
                    //业务处理逻辑
                    pipeline.addLast(new NettyMqttAuthHandler());
                    //业务处理逻辑
                    pipeline.addLast(data.getHandler());
                }
            });
            Channel channel = serverBootstrap.bind().sync().channel();
            log.info("mqtt 服务启动，ip={},port={},path={}", data.getIp(), data.getPort(), data.getPath());
            channel.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
